package com.gp.test;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Stopwatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;

import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

/**
 * FIFO队列，按进入队列的顺序消费
 *
 * @author
 * @create 2017-08-12
 */
public class FIFODistributedQueue<T> implements DistributedQueue<T> {
    private final CuratorFramework curatorFramework;
    private final String queueNode;

    public FIFODistributedQueue(String queueNode) {
        this.curatorFramework = CuratorUtil.getInstance();
        this.queueNode = queueNode;
    }

    public void produce(T data) {
        try {
            if (this.curatorFramework.checkExists().forPath(this.queueNode) == null) {
                this.curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(this.queueNode);
            }
            String result = this.curatorFramework.create()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(this.queueNode + "/", JSON.toJSONString(data).getBytes());
            System.out.println("produce result:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void produce(List<T> datas) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        if(datas == null || datas.size() == 0)
            return;
        for (T data : datas) {
            this.produce(data);
        }

//        try {
//            CuratorTransaction curatorTransaction = curatorFramework.inTransaction();
//            if(curatorTransaction.check().forPath(this.queueNode) == null) {
//                curatorTransaction.create().withMode(CreateMode.PERSISTENT).forPath(this.queueNode);
//            }
//            for (T data : datas) {
//                CuratorTransactionFinal curatorTransactionFinal = curatorTransaction.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
//                        .forPath(this.queueNode + "/", JSON.toJSONString(data).getBytes()).and();
//                curatorTransaction = curatorTransactionFinal;
//            }
//            ((CuratorTransactionFinal)curatorTransaction).commit();
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        System.out.println("produce elapsed time " + stopwatch.elapsed(TimeUnit.MILLISECONDS));
    }

    public Node<String, T> counsume() {
        List<String> queueNodes = null;
        try {
            queueNodes = this.curatorFramework.getChildren().forPath(this.queueNode);
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (queueNodes == null)
            return null;
        SortedSet<String> sortedSet = new TreeSet<String>();
        for (String queueNode : queueNodes) {
            sortedSet.add(queueNode);
        }
        if (sortedSet.isEmpty())
            return null;

        String firstNode = sortedSet.first();
        String nodeFullPath = this.queueNode + "/" + firstNode;
        try {
            byte[] bytes = this.curatorFramework.getData().forPath(nodeFullPath);
            Node node = new Node(firstNode, new String(bytes));
            this.curatorFramework.delete().forPath(nodeFullPath);
            return node;
        } catch (Exception e) {
            //e.printStackTrace();
            return null;
        }
    }
}
